/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (lifei@wellbole.com).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus.proto;

import com.jfinal.log.Logger;
import com.jfinal.plugin.zbus.core.AbstractSender;
import com.jfinal.plugin.zbus.core.TMsgHandler;
import org.reflections.Reflections;
import org.zbus.mq.Protocol;
import org.zbus.net.http.Message;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @ClassName: ZbusTopic
 * @Description: Topic主题对象  
 * @since V1.0.0
 */
public class ZbusTopic {

    private static final Logger log = Logger.getLogger(ZbusTopic.class);

    public String mqName;
	public String topic;

    public ReceiverT receiverT = new ReceiverT();
    public SenderT senderT;

    //单独设置producer和sender的个数.
	public ZbusTopic init(String address,String mqName, String topic, Integer producerNum, Integer senderNum){
        this.mqName = mqName;
        this.topic = topic;
        this.senderT = new SenderT(address,producerNum,senderNum);
        return this;
    }

    //默认producer与sender一一对应
    public ZbusTopic init(String address,String mqName, String topic, Integer maxNum){
        return init(address,mqName,topic,maxNum,maxNum);
    }

    /**
     * @ClassName: RegT
     * @Description: Topic注解注册
     * @since V1.0.0
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ ElementType.TYPE })
    public @interface RegT{
        String name();
        String topic();
        boolean enable() default true;
        int reloadNum() default 1;
    }

    /**
     * @ClassName: SenderT
     * @Description: Topic泛型发送器
     * @since V1.0.0
     */
    public class SenderT<T> extends AbstractSender<T> {

        private final String address;
        private final Integer maxProducerNum;
        private final Integer maxSenderNum;

        private String inner_topic;

        /**
         * @ClassName: SenderT
         * @Description: 构建一个Topic发送器
         * @since V1.0.0
         */
        public SenderT(String address,Integer producerNum,Integer senderNum){
            this.address = address;
            this.maxProducerNum = producerNum;
            this.maxSenderNum = senderNum;
            init();
        }

        //默认只启动一个producer和一个sender
        public SenderT(){
            this("127.0.0.1",1,1);
        }

        public SenderT init(String address,String mqName, String topic) {
            init(address, mqName, Protocol.MqMode.PubSub, maxProducerNum, maxSenderNum);
            this.inner_topic = topic;
            return this;
        }

        public SenderT init() {
            return init(address,mqName,topic);
        }

        //设定topic
        @Override
        protected Message encode(T obj) {
            return super.encode(obj).setTopic(inner_topic);
        }
    }

//    public static Map<String, Map<String, TMsgHandler<?>>> tMsgHandlerTMap = new HashMap<String, Map<String, TMsgHandler<?>>>();

    public static Map<String, ReceiverT> tMsgReceiverTMap = new HashMap<String, ReceiverT>();

    public class ReceiverT {

        public Map<String, TMsgHandler<?>> tmc;
        public Integer maxConsumerNum;

        /**
         * @ClassName: register
         * @Description: 注册Topic的消息回调接口
         * @since V1.0.0
         */
//        public ReceiverT register(String mqName, String topic, TMsgHandler<?> msgHandler) {
//            // 依据mq获得 topic－TMsgHandler映射map
//            Map<String, TMsgHandler<?>> tmc = tMsgHandlerTMap.get(mqName);
//            if (null == tmc) {
//                tmc = new HashMap<String, TMsgHandler<?>>();
//            }
//            if(tmc.containsKey(topic)){
//                log.warn("(mq=" + mqName + ",topic=" + topic + ")对应的消息处理器已存在!");
//            }
//            tmc.put(topic, msgHandler);
//            tMsgHandlerTMap.put(mqName, tmc);
//            return this;
//        }

        public ReceiverT register(String mqName, String topic, Integer consumerNum,
                                  TMsgHandler<?> msgHandler) {
            // 依据mq获得receiverT的映射map
            ReceiverT receiverT = tMsgReceiverTMap.get(mqName);
            if (null == receiverT) {
                tmc = new HashMap<String, TMsgHandler<?>>();
            }
            if(tmc.containsKey(topic)){
                log.warn("(mq=" + mqName + ",topic=" + topic + ")对应的消息处理器已存在!");
            }
            tmc.put(topic, msgHandler);
            maxConsumerNum = consumerNum;
            tMsgReceiverTMap.put(mqName,this);
            return this;
        }

        /**
         * @ClassName: register
         * @Description: 注册Topic的消息回调接口
         * @since V1.0.0
         */
//        public ReceiverT register(TMsgHandler<?> msgHandler) {
//            return this.register(mqName, topic, msgHandler);
//        }

        public ReceiverT register(Integer maxConsumerNum,TMsgHandler<?> msgHandler) {
            return register(mqName, topic, maxConsumerNum, msgHandler);
        }

        public ReceiverT autoLoadTAnno(Reflections reflections){
            Set<Class<?>> topicHandlerClasses = reflections.getTypesAnnotatedWith(RegT.class);
            for (Class<?> mc : topicHandlerClasses) {
                if(!TMsgHandler.class.isAssignableFrom(mc)){
                    throw new RuntimeException(mc.getName() + " 必须继承自 TMsgHandler<T>");
                }
                RegT th = mc.getAnnotation(RegT.class);
                try {
                    if(th.enable()) {
                        TMsgHandler<?> hander = (TMsgHandler<?>) mc.newInstance();
                        register(th.name(), th.topic(), th.reloadNum(), hander);
                        log.info("通过注解自动加载Topic消息处理器( mq=" + th.name()  + ",topic=" + th.topic() + ",handler=" + mc.getName() + " )");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e.getMessage(),e);
                }
            }
            return this;
        }

    }

}
